Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: transport package #1748

Merged
merged 1 commit into from
Jan 12, 2017
Merged

raft: transport package #1748

merged 1 commit into from
Jan 12, 2017

Conversation

LK4D4
Copy link
Contributor

@LK4D4 LK4D4 commented Nov 14, 2016

I'm trying to come up with a smaller testable package for raft transport to simplify membership.Cluster and have better coverage. It's just a start, and it doesn't pass the linters or whatever for now. Will appreciate any feedback.
ping @aaronlehmann


func (p *peer) stop() {
close(p.stopped)
<-p.done
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this could block for a long time if p.msgc has a lot of messages enqueued. Maybe processLoop should check if p.stopped is closed before selecting on p.msgc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it would make sense to reuse WithContext here, and closing p.stopped would cause the context to be cancelled.

case <-p.stopped:
return errors.New("peer stopped")
case <-ctx.Done():
return ctx.Err()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<-p.stopped and <-ctx.Done should be checked in a different select from p.msgc <- m. Otherwise the p.msgc <-m branch can be randomly chosen.

for _, e := range errs {
errStr += "\n" + e.Error()
}
return errors.Errorf("errors occured during Send: %s", errStr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these eventually end up in the log. I think it's a lot cleaner to just log each error separately. Logging something containing newlines is pretty bad.

p.active = false
p.mu.Unlock()
}
}()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this defer is necessary at all. If the message queue is full, then most likely the node is down and sendProcessMessage will call ReportUnreachable. If the context is cancelled or p.stopped is closed, I don't think calling ReportUnreachable is appropriate.

if err != nil {
return err
}
p, err := newPeer(m.To, addr, t)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's problematic to use a peer that isn't tracked in the t.peers list. For example, that peer might call ReportUnreachable after raft has shut down.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Nov 15, 2016

@aaronlehmann PTAL. I'm still not sure how to handle message to unknown peer properly. Now I just create peer for it, which might not be desirable on Send. Other solution would be to have separate goroutine which sends messages to unknown peers. Let me know what do you think.

@aaronlehmann
Copy link
Collaborator

Now I just create peer for it, which might not be desirable on Send

What is the downside?

@codecov-io
Copy link

codecov-io commented Nov 15, 2016

Current coverage is 55.00% (diff: 65.15%)

Merging #1748 into master will increase coverage by 0.18%

@@             master      #1748   diff @@
==========================================
  Files           103        105     +2   
  Lines         17250      17518   +268   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           9456       9636   +180   
- Misses         6649       6724    +75   
- Partials       1145       1158    +13   

Sunburst

Powered by Codecov. Last update fef7386...79a5679

@LK4D4
Copy link
Contributor Author

LK4D4 commented Nov 15, 2016

@aaronlehmann it calls Dial (we probably need to call health check there as well) which might block sending messages for some time.

@aaronlehmann
Copy link
Collaborator

Yeah, that's not good. If a dedicated goroutine for sending messages to unknown peers would solve the problem, it's worth considering.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Nov 16, 2016

@aaronlehmann PTAL
I've moved unknown senders to separate goroutine and embedded context more deeply. Also added healthcheck on peer add.

require.NoError(t, c.Add(2))

// set channel to nil to emulate full queue
c.Get(1).tr.peers[2].msgc = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data race here


// unknownSender sends messages to unknown peers. It creates new peer for each
// message and discards it after send.
func (t *Transport) unknownSender(ctx context.Context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this goroutine is giving us anything over the original goroutine-per-unknown-send approach. The problem I had with the original approach is that there was no way to make sure the goroutine was done before shutting down raft, but this seems to have the same problem.

Maybe going back to the original approach of spawning a goroutine every time we need to talk to an unknown sender, plus a wait group that run waits on, would do the trick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need to be sure that the raft is up in this case. ReportUnreachable and ReportSnapshot is no-op in those cases. Cancelling request should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this to run goroutine, so Done will wait until unknown stuff is processed as well.

@LK4D4 LK4D4 force-pushed the raft_transport branch 2 times, most recently from 8d6fc76 to 542aad9 Compare November 16, 2016 15:29
@LK4D4
Copy link
Contributor Author

LK4D4 commented Nov 16, 2016

@aaronlehmann PTAL, now you can wait on Done until everything is finished.

@aaronlehmann
Copy link
Collaborator

That looks much better, thanks.

p.tr.config.Raft.ReportSnapshot(m.To, raft.SnapshotFailure)
}
p.tr.config.Raft.ReportUnreachable(m.To)
if grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight change to this in #1779 that should be replicated here.

case <-ctx.Done():
return ctx.Err()
case <-t.ctx.Done():
return ctx.Err()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return t.ctx.Err() ?

@@ -539,6 +539,8 @@ func ContextErr(err error) StreamError {
case context.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
fmt.Printf("%T %v\n", err, err)
fmt.Printf("%T %v\n", context.Canceled, context.Canceled)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committed unintentionally?

@aaronlehmann
Copy link
Collaborator

I think we should move forward with trying to port the raft code to use the new transport package. If we wait, it will become harder as the code diverges.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Dec 1, 2016

@aaronlehmann thanks! will do.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Dec 22, 2016

@aaronlehmann I've replaced the code with transport package. However, I blindly removed stuff with lastSeenHost and not sure that made proper replacement.
@cyli @dperny I'd love some review here. This PR is about separating transport of raft messages from Node structure.

@LK4D4 LK4D4 force-pushed the raft_transport branch 3 times, most recently from 8bc8c79 to 5b73465 Compare December 27, 2016 23:49
@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 7, 2017

Ok, it passes docker integration. Will fix comments at monday.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 9, 2017

@aaronlehmann @cyli I've split logic from restoreFromSnapshot.
@cyli I wasn't able to simplify stuff with map too much. Full Clear() would require some sort of synchronization with transport and also it would make short "quorum loss" for some functions that checks that. The solution would be to acquire members lock for whole snapshot operation, but I don't like how it looks.
Thanks!

@LK4D4 LK4D4 force-pushed the raft_transport branch 2 times, most recently from 5d49d47 to c047a61 Compare January 9, 2017 17:24
Copy link
Contributor

@cyli cyli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

m, ok := oldMembers[removedMember]
if !ok {
continue
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should call RemoveMember even if the member is not in oldMembers, because we need to keep track of the fact that this member ID was removed from the cluster.

@@ -215,38 +222,18 @@ func (n *Node) doSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
<-viewStarted
}

func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) error {
func (n *Node) restoreFromSnapshot(data []byte, forceNewCluster bool) (api.ClusterSnapshot, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forceNewCluster is not used anymore.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 9, 2017

@aaronlehmann Fixed, thanks! Also, I have 52 sequential passes of integration test on my machine so far.

@LK4D4 LK4D4 force-pushed the raft_transport branch 3 times, most recently from acee090 to b576841 Compare January 10, 2017 22:05
@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 10, 2017

@aaronlehmann PTAL. I've added address change handling.

type hostsStore struct {
mu sync.Mutex
hosts map[uint64]string
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

@@ -841,11 +935,15 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID
return n.configure(ctx, cc)
}

// updateMember submits a configuration change to change a member's address.
func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error {
// updateNodeBlocking runs synchronous job to update node addres in whole cluster.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: address

@@ -435,9 +497,36 @@ func (n *Node) Run(ctx context.Context) error {
// saveToStorage.
if !raft.IsEmptySnap(rd.Snapshot) {
// Load the snapshot data into the store
if err := n.restoreFromSnapshot(rd.Snapshot.Data, false); err != nil {
snapCluster, err := n.clusterSnapshot(rd.Snapshot.Data)
if err != nil {
log.G(ctx).WithError(err).Error("failed to restore from snapshot")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this clear snapCluster? It seems bad to use it below if an error was returned.

if err := p.updateAddr(addr); err != nil {
return err
}
log.G(t.ctx).Debugf("peer %x updated to address %s, it will be used if old failed", id, addr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my testing, with a three node cluster that has one node down, I'm seeing this log line every second in each of the two remaining managers' logs (referring to the other remaining manager in each case). This doesn't seem right because neither address has changed.

@aaronlehmann
Copy link
Collaborator

I tested the address change detection and it seems to work, except for the spammy log message.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 11, 2017

@aaronlehmann I've fixed message and other your comments. Thanks for review and testing!

Copy link
Collaborator

@aaronlehmann aaronlehmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

This package is separate grpc transport layer for raft package. Before we
used membership package + one very big method in raft package.

Signed-off-by: Alexander Morozov <[email protected]>
@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 12, 2017

Ok, I'm trying last time with docker and then merging.

@LK4D4
Copy link
Contributor Author

LK4D4 commented Jan 12, 2017

Only TestSwarmNetworkPlugin fails which is expected after #1856

@LK4D4 LK4D4 merged commit 50f82a9 into moby:master Jan 12, 2017
@LK4D4 LK4D4 deleted the raft_transport branch January 12, 2017 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants